Source code for hysop.fields.continuous_field

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Continuous fields description and containers.
* :class:`~hysop.fields.continuous.FieldContainerI`
* :class:`~hysop.fields.continuous.ScalarField`
* :class:`~hysop.fields.continuous.VectorField`
* :class:`~hysop.fields.continuous.TensorField`
"""

import textwrap
import sympy as sm
import numpy as np
from abc import ABCMeta, abstractmethod

from hysop.constants import (
    HYSOP_REAL,
    HYSOP_BOOL,
    BoundaryCondition,
    BoundaryConditionConfig,
    DirectionLabels,
)
from hysop.tools.decorators import debug
from hysop.tools.htypes import check_instance, first_not_None, to_tuple
from hysop.tools.warning import HysopWarning
from hysop.tools.handle import TaggedObject
from hysop.tools.numpywrappers import npw
from hysop.domain.domain import Domain
from hysop.domain.box import BoxBoundaryCondition
from hysop.topology.topology import Topology, TopologyState
from hysop.tools.sympy_utils import (
    nabla,
    partial,
    subscript,
    subscripts,
    exponent,
    exponents,
    xsymbol,
)
from hysop.symbolic import SpaceSymbol
from hysop.tools.interface import (
    NamedObjectI,
    SymbolContainerI,
    NamedScalarContainerI,
    NamedTensorContainerI,
)


[docs] class FieldContainerI(TaggedObject): """Common abstract interface for scalar and tensor-like fields."""
[docs] @debug def __new__( cls, domain, name=None, nb_components=None, shape=None, is_vector=None, **kwds ): """ Create a FieldContainer on a specific domain. domain : domain.Domain Physical domain where this field is defined. kwds: dict Keywords arguments for base class. Notes ----- This can also be used to instantiate a ScalarField, VectorField or TensorField depending on nb_components, shape and is_vector. """ if not issubclass(cls, (ScalarField, VectorField, TensorField)): check_instance(name, str, allow_none=False) if shape is not None: return TensorField(domain=domain, name=name, shape=shape, **kwds) elif (is_vector is True) or ( (nb_components is not None) and (nb_components > 1) ): nb_components = first_not_None(nb_components, domain.dim) assert (is_vector is not True) or (nb_components == domain.dim) return VectorField( domain=domain, name=name, nb_components=nb_components, **kwds ) else: return ScalarField(domain=domain, name=name, **kwds) check_instance(domain, Domain) obj = super().__new__(cls, **kwds) obj._domain = domain obj._dim = int(domain.dim) return obj
@debug def __init__( self, domain, name=None, nb_components=None, shape=None, is_vector=None, **kwds ): super().__init__(**kwds) @property def is_scalar(self): return not self.is_tensor
[docs] @abstractmethod def field_like(self, name, **kwds): """Create a ScalarField like this object, possibly altered.""" pass
[docs] @abstractmethod def tmp_like(self, name, **kwds): """Create a temporary field like self, possibly altered.""" pass
[docs] @abstractmethod def fields(self): """Return all unique scalar fields contained in this field container.""" pass
[docs] @abstractmethod def nb_components(self): """ Total number of components of this field container, exluding None entries, but including duplicate fields. """ pass
[docs] @abstractmethod def discretize(self, topology, topology_state=None): """ Discretization of the field on a given topology. Parameters ---------- topology : :class:`hysop.mpi.core.topology.Topology` The topology on which to discretize this ScalarField. state: :class:`hysop.mpi.core.topology.TopologyState`, optional The topology state on which to discretize this ScalarField. """ pass
[docs] @classmethod def from_sympy_expressions( cls, name, exprs, space_symbols, scalar_name_prefix=None, scalar_pretty_name_prefix=None, pretty_name=None, **kwds, ): """ Create a field wich has the same shape as exprs, with optional names. Expressions should be of kind sympy.Expr and are converted to FieldExpression: this means they all have to contain at least one FieldExpression. Note that field.symbol is always a SymbolicField which is a FieldExpression. FieldExpression make sure boundary conditions match between fields for derivatives and integrations. """ if isinstance(exprs, sm.Expr): raise NotImplementedError("Call self.from_sympy_expression instead.") check_instance(exprs, npw.ndarray, values=sm.Expr) check_instance(name, str) check_instance(pretty_name, str, allow_none=True) check_instance(scalar_name_prefix, str, allow_none=True) check_instance(scalar_pretty_name_prefix, str, allow_none=True) fields = npw.empty(shape=exprs.shape, dtype=object) fields[...] = None for idx in npw.ndindex(*exprs.shape): if scalar_name_prefix is not None: sname = TensorField.default_name_formatter(scalar_name_prefix, idx) if scalar_pretty_name_prefix is not None: spname = TensorField.default_pretty_name_formatter( scalar_pretty_name_prefix, idx ) else: spname = TensorField.default_pretty_name_formatter( scalar_name_prefix, idx ) else: # names will be autogenerated from sympy expression sname = None spname = None fields[idx] = cls.from_sympy_expression( expr=exprs[idx], space_symbols=space_symbols, name=sname, pretty_name=spname, **kwds, ) return TensorField.from_field_array( name=name, pretty_name=pretty_name, fields=fields )
[docs] @classmethod def from_sympy_expression(cls, expr, space_symbols, **kwds): from hysop.symbolic.field import FieldExpressionBuilder from hysop.tools.field_utils import print_all_names assert "lboundaries" not in kwds assert "rboundaries" not in kwds assert "domain" not in kwds # determine names if not given if ("name" not in kwds) or (kwds["name"] is None): (name, pretty_name, var_name, latex_name) = print_all_names(expr) kwds["name"] = name kwds["pretty_name"] = pretty_name kwds["var_name"] = var_name kwds["latex_name"] = latex_name # determine domain and boundary conditions fe = FieldExpressionBuilder.to_field_expression( expr=expr, space_symbols=space_symbols, strict=True ) kwds["domain"] = fe.domain kwds["lboundaries"] = fe.lboundaries kwds["rboundaries"] = fe.rboundaries # deduce data type from field expression if not specified kwds["dtype"] = first_not_None(kwds.get("dtype", None), fe.dtype) # finally return create and return the ScalarField return ScalarField(**kwds)
[docs] def gradient( self, name=None, pretty_name=None, scalar_name_prefix=None, scalar_pretty_name_prefix=None, directions=None, axis=-1, space_symbols=None, dtype=None, **kwds, ): """ Create a field capable of storing the gradient of self, possibly altered. """ dim = self.dim # dimension of the domain ndim = self.ndim # number of dimension of the np.ndarray frame = self.domain.frame directions = to_tuple(first_not_None(directions, range(dim))) space_symbols = to_tuple(first_not_None(space_symbols, frame.coords)) check_instance( directions, tuple, minval=0, maxval=self.dim - 1, minsize=1, unique=True ) check_instance(axis, int, minval=-ndim, maxval=ndim - 1) check_instance(space_symbols, tuple, values=SpaceSymbol, size=dim, unique=True) ndirs = len(directions) if ndim > 0: axis = (axis + ndim) % ndim shape = self.shape[: axis + 1] + (ndirs,) + self.shape[axis + 1 :] else: shape = (ndirs,) name = first_not_None(name, f"grad_{self.name}") pretty_name = first_not_None( pretty_name, "{}{}".format(nabla, self.pretty_name) ) if shape == (1,): expr = self.symbol(frame.time, *space_symbols).diff( space_symbols[directions[0]] ) return self.from_sympy_expression( expr=expr, space_symbols=space_symbols, name=name, pretty_name=pretty_name, dtype=dtype, **kwds, ) else: exprs = npw.empty(shape=shape, dtype=object) for idx in npw.ndindex(*shape): i = idx[: axis + 1] + idx[axis + 2 :] d = directions[idx[axis + 1]] if self.is_tensor: exprs[idx] = ( self[i] .symbol(frame.time, *space_symbols) .diff(space_symbols[d]) ) else: assert i == (), i exprs[idx] = self.symbol(frame.time, *space_symbols).diff( space_symbols[d] ) return self.from_sympy_expressions( exprs=exprs, space_symbols=space_symbols, name=name, pretty_name=pretty_name, scalar_name_prefix=scalar_name_prefix, scalar_pretty_name_prefix=scalar_pretty_name_prefix, dtype=dtype, **kwds, )
[docs] def laplacian( self, name=None, pretty_name=None, scalar_name_prefix=None, scalar_pretty_name_prefix=None, dtype=None, **kwds, ): from hysop.symbolic.field import laplacian frame = self.domain.frame exprs = laplacian(self.symbol(*frame.vars), frame) name = first_not_None(name, f"laplacian_{self.name}") pretty_name = first_not_None(pretty_name, {}".format(self.pretty_name)) if isinstance(exprs, npw.ndarray): if exprs.size == 1: expr = exprs.item() return self.from_sympy_expression( expr=expr, space_symbols=frame.coords, name=name, pretty_name=pretty_name, dtype=dtype, **kwds, ) else: return self.from_sympy_expressions( exprs=exprs, space_symbols=frame.coords, name=name, pretty_name=pretty_name, scalar_name_prefix=scalar_name_prefix, scalar_pretty_name_prefix=scalar_pretty_name_prefix, dtype=dtype, **kwds, ) else: expr = exprs return self.from_sympy_expression( expr=expr, space_symbols=frame.coords, name=name, pretty_name=pretty_name, dtype=dtype, **kwds, )
[docs] def div( self, name=None, pretty_name=None, scalar_name_prefix=None, scalar_pretty_name_prefix=None, axis=-1, dtype=None, **kwds, ): """ Create a field capable of storing the divergence of self, on chosen axis. """ from hysop.symbolic.field import div frame = self.domain.frame exprs = npw.asarray(div(self.symbol(*frame.vars), frame)) name = first_not_None(name, f"div_{self.name}") pretty_name = first_not_None( pretty_name, "{}{}".format(nabla, self.pretty_name) ) if exprs.size in (0, 1): expr = exprs.item() return self.from_sympy_expression( expr=expr, space_symbols=frame.coords, name=name, pretty_name=pretty_name, dtype=dtype, **kwds, ) else: return self.from_sympy_expressions( exprs=exprs, space_symbols=frame.coords, name=name, pretty_name=pretty_name, scalar_name_prefix=scalar_name_prefix, scalar_pretty_name_prefix=scalar_pretty_name_prefix, dtype=dtype, **kwds, )
[docs] def curl( self, name=None, pretty_name=None, scalar_name_prefix=None, scalar_pretty_name_prefix=None, dtype=None, **kwds, ): """ Create a field capable of storing the curl of self, Only 2D and 3D fields are supported as the curl brings a 1-vector to a 2-vector: - A vector to a pseudoscalar or a pseudoscalar to a vector in 2D - A vector to a pseudovector or a pseudovector to a vector in 3D In 1D the curl is 0, and in 4D the curl would be a 6D 'field'. """ from hysop.symbolic.field import curl if self.dim == 2: msg = "Can only take curl for a 2D field with one or two components." assert self.nb_components in (1, 2), msg elif self.dim == 3: msg = "Can only take curl for a 3D field with three components." assert self.nb_components in (3,), msg else: msg = "Can only take curl for a 2D or 3D vector field." assert self.dim in (2, 3), msg frame = self.domain.frame exprs = curl(self.symbol(*frame.vars), frame) name = first_not_None(name, f"curl_{self.name}") pretty_name = first_not_None( pretty_name, "{}{}".format(nabla, self.pretty_name) ) if isinstance(exprs, npw.ndarray): return self.from_sympy_expressions( exprs=exprs, space_symbols=frame.coords, name=name, pretty_name=pretty_name, scalar_name_prefix=scalar_name_prefix, scalar_pretty_name_prefix=scalar_pretty_name_prefix, dtype=dtype, **kwds, ) else: return self.from_sympy_expression( expr=exprs, space_symbols=frame.coords, name=name, pretty_name=pretty_name, dtype=dtype, **kwds, )
[docs] def rot(self, *args, **kwds): """See curl.""" return self.curl(*args, **kwds)
[docs] def get_attributes(self, *attrs): """ Return all matching attributes contained in self.fields, as a tuple. """ objects = () for field in self.fields: obj = field for attr in attrs: obj = getattr(obj, attr) objects += (obj,) return objects
[docs] def has_unique_attribute(self, *attr): """ Return true if all contained fields share the same attribute (as stated by the == comparisson operator). """ def are_equal(a, b): if a is b: return True if type(a) != type(b): return False if isinstance(a, (list, tuple, set, frozenset)): for ai, bi in zip(a, b): if not are_equal(ai, bi): return False return True if isinstance(a, dict): for k in set(a.keys()).union(b.keys()): if (k not in a) or (k not in b): return False ak, bk = a[k], b[k] if not are_equal(ak, bk): return False return True if isinstance(a, npw.ndarray): return npw.array_equal(a, b) return a == b objects = self.get_attributes(*attr) obj0 = objects[0] for obj in objects[1:]: if not are_equal(obj0, obj): return False return True
[docs] def get_unique_attribute(self, *attr): """ Try to return the unique attribute common to all contained fields. Raise an AttributeError if a attribute is not unique accross contained field views. """ if self.has_unique_attribute(*attr): return self.fields[0].get_attributes(*attr)[0] msg = "{} is not unique accross contained fields." msg = msg.format(".".join(str(x) for x in attr)) raise AttributeError(msg)
[docs] def has_unique_dtype(self): """Return true if all contained discrete fields share the same dtype.""" return self.has_unique_attribute("dtype")
[docs] def has_unique_lboundaries(self): """Return true if all contained continuous fields share the same lboundaries.""" return self.has_unique_attribute("lboundaries")
[docs] def has_unique_rboundaries(self): """Return true if all contained continuous fields share the same rboundaries.""" return self.has_unique_attribute("rboundaries")
[docs] def has_unique_boundaries(self): """Return true if all contained continuous fields share the same boundaries.""" return self.has_unique_attribute("boundaries")
[docs] def has_unique_lboundaries_kind(self): """Return true if all contained continuous fields share the same lboundaries kind.""" return self.has_unique_attribute("lboundaries_kind")
[docs] def has_unique_rboundaries_kind(self): """Return true if all contained continuous fields share the same rboundaries kind.""" return self.has_unique_attribute("rboundaries_kind")
[docs] def has_unique_boundaries_kind(self): """Return true if all contained continuous fields share the same boundaries kind.""" return self.has_unique_attribute("boundaries_kind")
[docs] def has_unique_periodicity(self): """Return true if all contained continuous fields share the same periodicity.""" return self.has_unique_attribute("periodicity")
@property def dtype(self): """ Try to return the unique data type common to all contained fields, else raise an AttributeError. """ return self.get_unique_attribute("dtype") @property def lboundaries(self): """ Try to return the unique lboundaries common to all contained fields, else raise an AttributeError. """ return self.get_unique_attribute("lboundaries") @property def rboundaries(self): """ Try to return the unique rboundaries common to all contained fields, else raise an AttributeError. """ return self.get_unique_attribute("rboundaries") @property def boundaries(self): """ Try to return the unique boundaries common to all contained fields, else raise an AttributeError. """ return self.get_unique_attribute("boundaries") @property def lboundaries_kind(self): """ Try to return the unique lboundaries common to all contained fields, else raise an AttributeError. """ return self.get_unique_attribute("lboundaries_kind") @property def rboundaries_kind(self): """ Try to return the unique rboundaries common to all contained fields, else raise an AttributeError. """ return self.get_unique_attribute("rboundaries_kind") @property def boundaries_kind(self): """ Try to return the unique boundaries kind common to all contained fields, else raise an AttributeError. """ return self.get_unique_attribute("boundaries_kind") @property def periodicity(self): """ Try to return the unique periodicity common to all contained fields, else raise an AttributeError. """ return self.get_unique_attribute("periodicity") def __eq__(self, other): return self is other def __ne__(self, other): return self is not other def __hash__(self): return id(self) def _get_domain(self): """Return the physical domain where this field is defined.""" return self._domain def _get_dim(self): """Return the dimension of the physical domain.""" return self._dim domain = property(_get_domain) dim = property(_get_dim)
[docs] class ScalarField(NamedScalarContainerI, FieldContainerI): """ A continuous field is a scalar field defined on a physical domain. This object handles a dictionnary of discrete fields (from 0 to any number). Each discrete field is uniquely defined by the topology used to discretize it. Example ------- If topo1 and topo2 are two hysop.topology.cartesian_topology.Topology instances: scal = ScalarField(domain=dom, name='Scalar') # Discretize the field on the two topologies: scal.discretize(topo1) scal.discretize(topo2) # Access the discrete fields: scal_discr1 = scal.discrete_fields[topo1] scal_discr2 = scal.discrete_fields[topo2] """
[docs] @debug def __new__( cls, domain, name, pretty_name=None, var_name=None, latex_name=None, initial_values=None, dtype=HYSOP_REAL, lboundaries=None, rboundaries=None, is_tmp=False, mem_tag=None, **kwds, ): r""" Create or get an existing continuous ScalarField (scalar or vector) on a specific domain. Parameters ---------- domain : domain.Domain Physical domain where this field is defined. name : string A name for the field. pretty_name: str, optional. A pretty name used for display whenever possible. Defaults to name. var_name: string, optional. A variable name used for code generation. This will be passed to the symbolic representation of this field. latex_name: string, optional. A variable name used for latex generation. This will be passed to the symbolic representation of this field. dtype: npw.dtype, optional, defaults to HYSOP_REAL Underlying data type of this field initial_values: numeric value, or tuple of numeric values, optional Fields are initialized to specified initial value everywhere in the domain on first discretization. The input values are cast to given dtype. If None, leaves the memory uninitialized. If a single value is given, the whole field is initialized to this value, the default being None (ie. no initialization at all). If tuple, computational mesh will be initialized with the first value, and ghosts will be initialized with the second value. lboundaries: array_like of BoundaryCondition or BoundaryConditionConfig, optional Left boundary conditions, defaults to PERIODIC on each axis. rboundaries: array_like of BoundaryCondition or BoundaryConditionConfig, optional Right boundary conditions, defaults to PERIODIC on each axis. is_tmp: bool Specify that this field is a temporary continuous field. Basically a ScalarField that yields a temporary discrete field upon discretization. /!\ ** WARNING *******************************************/!\ TemporaryDiscreteFields are allocated during setup using temporary work buffers. Those work buffers are only available withing the scope of operators thats use this temporary field during their apply method. /!\ ***************************************************** /!\ kwds: dict Base class keyword arguments. Attributes ---------- boundaries: tuple of numpy.ndarray of BoundaryCondition Left and right boundary conditions as a tuple. periodicity: numpy.ndarray of bool Numpy array mask, True is axis is periodic, else False. """ check_instance(name, str) check_instance(pretty_name, str, allow_none=True) check_instance(latex_name, str, allow_none=True) check_instance(var_name, str, allow_none=True) check_instance(is_tmp, bool) if mem_tag is not None: assert is_tmp, "Can only specify mem_tag for temporary fields." # Data type of the field if (dtype == npw.bool_) or (dtype == bool): import warnings msg = "Parameter dtype=npw.bool_ has been converted " msg += f"to HYSOP_BOOL={HYSOP_BOOL.__name__}." warnings.warn(msg, HysopWarning) dtype = HYSOP_BOOL dtype = npw.dtype(dtype) # Name and pretty name pretty_name = first_not_None(pretty_name, name) check_instance(pretty_name, str) # Initial values if not isinstance(initial_values, (list, tuple)): initial_values = (initial_values, initial_values) assert len(initial_values) == 2 initial_values = tuple(initial_values) check_instance(initial_values, tuple, size=2) # Field boundary conditions lboundaries = npw.asarray( first_not_None( lboundaries, cls.default_boundaries_from_domain(domain.lboundaries) ) ) rboundaries = npw.asarray( first_not_None( rboundaries, cls.default_boundaries_from_domain(domain.rboundaries) ) ) check_instance( lboundaries, npw.ndarray, values=(BoundaryCondition, BoundaryConditionConfig), ndim=1, size=domain.dim, dtype=object, allow_none=True, ) check_instance( rboundaries, npw.ndarray, values=(BoundaryCondition, BoundaryConditionConfig), ndim=1, size=domain.dim, dtype=object, allow_none=True, ) assert lboundaries.size == rboundaries.size == domain.dim for i, (lb, rb) in enumerate(zip(lboundaries, rboundaries)): if (lb.bc == BoundaryCondition.PERIODIC) ^ ( rb.bc == BoundaryCondition.PERIODIC ): msg = f"Periodic BoundaryCondition mismatch on axis {i}." raise ValueError(msg) check_instance( lboundaries, npw.ndarray, values=(BoundaryCondition, BoundaryConditionConfig), ndim=1, size=domain.dim, dtype=object, ) check_instance( rboundaries, npw.ndarray, values=(BoundaryCondition, BoundaryConditionConfig), ndim=1, size=domain.dim, dtype=object, ) periodic = BoundaryCondition.PERIODIC periodicity = np.asarray(tuple(map(lambda x: x.bc, lboundaries))) == periodic kwds.pop("make_field", None) obj = super().__new__( cls, domain=domain, name=name, pretty_name=pretty_name, var_name=var_name, latex_name=latex_name, tag_prefix="f", tagged_cls=ScalarField, **kwds, ) obj._dtype = dtype obj._initial_values = initial_values obj._is_tmp = is_tmp obj._mem_tag = mem_tag obj._lboundaries = lboundaries obj._rboundaries = rboundaries obj._periodicity = periodicity # Symbolic representation of this field from hysop.symbolic.field import SymbolicField obj._symbol = SymbolicField(field=obj) # Dictionnary of all the discretizations of this field. # keys are hysop.topology.topology.Topology, # values are hysop.fields.discrete_field.DiscreteField. obj._discrete_fields = {} cls.__check_vars(obj) return obj
def __init__( self, domain, name, pretty_name=None, var_name=None, latex_name=None, initial_values=None, dtype=HYSOP_REAL, lboundaries=None, rboundaries=None, is_tmp=False, mem_tag=None, **kwds, ): kwds.pop("make_field", None) super().__init__( domain=domain, name=name, pretty_name=pretty_name, var_name=var_name, latex_name=latex_name, tag_prefix="f", tagged_cls=ScalarField, **kwds, )
[docs] @classmethod def default_boundaries_from_domain(cls, boundaries): check_instance(boundaries, npw.ndarray, values=BoxBoundaryCondition) field_boundaries = npw.empty_like(boundaries) field_boundaries[...] = None for i, bd in enumerate(boundaries): if bd is BoxBoundaryCondition.PERIODIC: fbd = BoundaryCondition.PERIODIC elif ( bd is BoxBoundaryCondition.SYMMETRIC ): # (normal to boundary velocity = 0) # let any advected scalar to be 0 in boundaries fbd = BoundaryCondition.HOMOGENEOUS_DIRICHLET elif bd is BoxBoundaryCondition.OUTFLOW: # (velocity normal to boundary) # let any advected scalar to go trough the boundary fbd = BoundaryCondition.HOMOGENEOUS_NEUMANN else: msg = "FATAL ERROR: Unknown domain boundary condition {}." msg = msg.format(bd) raise NotImplementedError(msg) field_boundaries[i] = fbd return field_boundaries
@classmethod def __check_vars(cls, obj): """Check properties and types.""" check_instance(obj.dtype, npw.dtype) check_instance(obj.domain, Domain) check_instance(obj.name, str) check_instance(obj.pretty_name, str) check_instance(obj.dim, int, minval=1) check_instance(obj.nb_components, int, minval=1) check_instance(obj.discrete_fields, dict) check_instance(obj.initial_values, tuple, size=2) check_instance( obj.lboundaries, npw.ndarray, values=(BoundaryCondition, BoundaryConditionConfig), ndim=1, size=obj.domain.dim, dtype=object, ) check_instance( obj.rboundaries, npw.ndarray, values=(BoundaryCondition, BoundaryConditionConfig), ndim=1, size=obj.domain.dim, dtype=object, ) check_instance( obj.periodicity, npw.ndarray, dtype=bool, ndim=1, size=obj.domain.dim ) check_instance(obj.is_tmp, bool)
[docs] def field_like( self, name, pretty_name=None, latex_name=None, var_name=None, domain=None, dtype=None, is_tmp=None, lboundaries=None, rboundaries=None, initial_values=None, **kwds, ): """Create a ScalarField like this object, possibly altered.""" check_instance(name, str) domain = first_not_None(domain, self.domain) dtype = first_not_None(dtype, self.dtype) is_tmp = first_not_None(is_tmp, self.is_tmp) lboundaries = first_not_None(lboundaries, self.lboundaries) rboundaries = first_not_None(rboundaries, self.rboundaries) initial_values = first_not_None(initial_values, self.initial_values) return ScalarField( name=name, pretty_name=pretty_name, var_name=var_name, latex_name=latex_name, domain=domain, dtype=dtype, is_tmp=is_tmp, lboundaries=lboundaries, rboundaries=rboundaries, initial_values=initial_values, **kwds, )
[docs] def tmp_like(self, name, **kwds): """Create a TemporaryField like self, possibly altered.""" return self.field_like(name=name, is_tmp=True, **kwds)
[docs] def short_description(self): """Short description of this field.""" s = "{}[pname={}, dim={}, dtype={}, bc=[{}], iv={}]" s = s.format( self.full_tag, self.name, self.dim, self.dtype, self.format_boundaries(), self.initial_values, ) return s
[docs] def format_boundaries(self): from hysop.constants import format_boundaries as fb return fb(*self.boundaries)
[docs] def long_description(self): """Long description of this field.""" s = textwrap.dedent( """ {} *name: {} *pretty_name: {} *var_name: {} *latex_name: {} *dim: {} *dtype: {} *left boundary: {} *right boundary: {} *initial values: {} *topology tags: [{}] """ ).format( self.full_tag, self.name, self.pretty_name, self.var_name, self.latex_name, self.dim, self.dtype, self.lboundaries.tolist(), self.rboundaries.tolist(), self.initial_values, ",".join([k.full_tag for k in self.discrete_fields.keys()]), ) return s[1:]
[docs] @debug def discretize(self, topology, topology_state=None): """ Discretization of the field on a given topology. Parameters ---------- topology : :class:`hysop.mpi.core.topology.Topology` The topology on which to discretize this ScalarField. state: :class:`hysop.mpi.core.topology.TopologyState`, optional The topology state on which to discretize this ScalarField. Returns -------- The discretized field :class:`hysop.fields.discrete_field.DiscreteField`. Note ---- In the case the discretization already exists, simply returns the discretized field. """ from hysop.fields.discrete_field import DiscreteField topology_state = first_not_None(topology_state, topology.default_state()) check_instance(topology, Topology) check_instance(topology_state, TopologyState) if topology not in self.discrete_fields: dfield = topology.discretize(self) check_instance(dfield, DiscreteField) if not self.is_tmp: assert topology in self.discrete_fields assert self.discrete_fields[topology] is dfield else: dfield = self.discrete_fields[topology] return dfield.view(topology_state)
def _get_dtype(self): """Return the default allocation dtype of this ScalarField.""" return self._dtype def _get_initial_values(self): """Return initial value of this field (compute_val, ghost_val).""" return self._initial_values def _get_discrete_fields(self): """ Return the dictionnary containing all the discretizations of this field. """ return self._discrete_fields def _get_lboundaries(self): """Left boundary conditions.""" return self._lboundaries def _get_rboundaries(self): """Right boundary conditions.""" return self._rboundaries def _get_lboundaries_kind(self): """Left boundary condition kind.""" return np.asarray(tuple(map(lambda x: x.bc, self._lboundaries))) def _get_rboundaries_kind(self): """Right boundary condition kind.""" return np.asarray(tuple(map(lambda x: x.bc, self._rboundaries))) def _get_boundaries(self): """Left and right boundary conditions as a tuple.""" return (self._lboundaries, self._rboundaries) def _get_boundaries_kind(self): """Left and right boundary condition kind as a tuple.""" return (self.lboundaries_kind, self._get_lboundaries_kind) def _get_periodicity(self): """Numpy array mask, True is axis is periodic, else False.""" return self._periodicity def _get_is_tmp(self): """Is this ScalarField a temporary field ?""" return self._is_tmp def _get_mem_tag(self): return self._mem_tag dtype = property(_get_dtype) initial_values = property(_get_initial_values) discrete_fields = property(_get_discrete_fields) lboundaries = property(_get_lboundaries) rboundaries = property(_get_rboundaries) boundaries = property(_get_boundaries) lboundaries_kind = property(_get_lboundaries_kind) rboundaries_kind = property(_get_rboundaries_kind) boundaries_kind = property(_get_boundaries_kind) periodicity = property(_get_periodicity) is_tmp = property(_get_is_tmp) mem_tag = property(_get_mem_tag) @property def is_tensor(self): return False @property def fields(self): return (self,) @property def nb_components(self): return 1 def __str__(self): return self.long_description() def __eq__(self, other): return self is other def __ne__(self, other): return self is not other def __hash__(self): return id(self)
[docs] class TensorField(NamedTensorContainerI, FieldContainerI): """ A continuous tensor field is a collection of scalar fields defined on a physical domain, organized as a multi-dimensional array.. This object handles a numpy.ndarray of continuous scalar fields, which may have different attributes (different data types for example). It is mainly a view on scalar fields with the FieldContainerI interface. A tensor field garanties that each different field objects have unique names and pretty names within the tensor field. A given continuous scalar field may appear in at multiple indices (to allow symetric tensor for example). Some components may also be set to None (to allow upper triangular matrices for example). Is also garanties that all fields shares the same domain. """ @property def is_tensor(self): return True def __new__( cls, domain, name, shape, pretty_name=None, name_formatter=None, pretty_name_formatter=None, skip_field=None, make_field=None, fields=None, base_kwds=None, **kwds, ): pretty_name = first_not_None(pretty_name, name) check_instance(name, str) check_instance(pretty_name, str) check_instance(shape, tuple, values=int) if (len(shape) == 1) and not issubclass(cls, VectorField): obj = VectorField( domain=domain, shape=shape, name=name, name_formatter=name_formatter, pretty_name=pretty_name, pretty_name_formatter=pretty_name_formatter, skip_field=skip_field, make_field=make_field, fields=fields, base_kwds=base_kwds, **kwds, ) return obj name_formatter = first_not_None(name_formatter, cls.default_name_formatter) pretty_name_formatter = first_not_None( pretty_name_formatter, cls.default_pretty_name_formatter ) skip_field = first_not_None(skip_field, cls.default_skip_field) make_field = first_not_None(make_field, cls.default_make_field) base_kwds = first_not_None(base_kwds, {}) check_instance(domain, Domain) if npw.prod(shape) <= 0: msg = "Invalid shape for a tensor-like field, got {}." msg = msg.format(shape) raise ValueError(msg) if fields is None: fields = () for idx in npw.ndindex(*shape): if skip_field(idx): field = None else: fname = name_formatter(basename=name, idx=idx) pfname = pretty_name_formatter(basename=pretty_name, idx=idx) field = make_field( idx, domain=domain, name=fname, pretty_name=pfname, **kwds ) fields += (field,) cls._check_fields(*fields) fields = npw.asarray(fields, dtype=object).reshape(shape) else: assert not kwds check_instance(fields, npw.ndarray, dtype=object) assert npw.array_equal(fields.shape, shape) obj = super().__new__( cls, domain=domain, name=name, pretty_name=pretty_name, tag_prefix="tf", tagged_cls=TensorField, contained_objects=fields, **base_kwds, ) obj._fields = fields obj._name_formatter = name_formatter obj._pretty_name_formatter = pretty_name_formatter obj._skip_field = skip_field from hysop.symbolic.field import SymbolicFieldTensor obj._symbol = SymbolicFieldTensor(field=obj) obj._check_domains(domain) obj._check_names() return obj def __init__( self, domain, name, shape, pretty_name=None, name_formatter=None, pretty_name_formatter=None, skip_field=None, make_field=None, fields=None, base_kwds=None, **kwds, ): base_kwds = first_not_None(base_kwds, {}) super().__init__( domain=domain, name=name, pretty_name=pretty_name, tag_prefix="tf", tagged_cls=TensorField, contained_objects=fields, **base_kwds, )
[docs] def discretize(self, topology, topology_state=None): from hysop.fields.discrete_field import DiscreteTensorField dfields = npw.empty(shape=self.shape, dtype=object) for idx, field in self.nd_iter(): dfields[idx] = field.discretize( topology=topology, topology_state=topology_state ) return DiscreteTensorField(field=self, dfields=dfields)
[docs] @classmethod def from_fields(cls, name, fields, shape, pretty_name=None, **kwds): """Create a TensorField from a list of fields and a shape.""" fields = to_tuple(fields) shape = to_tuple(shape) check_instance(fields, tuple, values=(ScalarField,), minsize=1) check_instance(shape, tuple, values=int) check_instance(name, str) check_instance(pretty_name, str, allow_none=True) cls._check_fields(*fields) field0 = first_not_None(*fields) domain = field0.domain fields = npw.asarray(fields, dtype=object).reshape(shape) return Field( domain=domain, name=name, shape=shape, pretty_name=pretty_name, fields=fields, **kwds, )
[docs] @classmethod def from_field_array(cls, name, fields, pretty_name=None, **kwds): """Create a TensorField from numpy.ndarray of fields.""" assert fields.size > 1 check_instance(name, str) check_instance(pretty_name, str, allow_none=True) check_instance(fields, npw.ndarray, dtype=object, values=ScalarField) shape = fields.shape _fields = tuple(fields.ravel().tolist()) cls._check_fields(*_fields) field0 = first_not_None(*_fields) domain = field0.domain return Field( domain=domain, name=name, pretty_name=pretty_name, shape=shape, fields=fields, **kwds, )
@classmethod def _check_fields(cls, *fields): """Check that at least one field is specified.""" field0 = first_not_None(*fields) if field0 is None: msg = "Tensor field {} should at least contain a valid ScalarField." msg = msg.format(field0.name) raise ValueError(msg)
[docs] @classmethod def default_name_formatter(cls, basename, idx): assert len(basename) > 0 if basename[-1] in "0123456789": sep = "_" else: sep = "" name = basename + sep + "_".join(str(i) for i in idx) return name
[docs] @classmethod def default_pretty_name_formatter(cls, basename, idx): check_instance(basename, str) assert len(basename) > 0 pname = basename + subscripts(ids=idx, sep="") return pname
[docs] @classmethod def default_make_field(cls, idx, **kwds): return ScalarField(**kwds)
[docs] @classmethod def default_skip_field(cls, idx): return False
def _check_domains(self, domain): """Check that fields share a unique domain.""" for field in self: check_instance(field, ScalarField) if field.domain.domain is not domain: msg = f"Domain mismatch for field {field.name}." raise ValueError(msg) def _check_names(self): """Check that fields names are unique.""" names = {} pnames = {} for field in self: name = field.name pname = field.pretty_name if (name in names) and (names[name] is not field): msg = "Name {} was already used by another field." msg = msg.format(name) raise ValueError(msg) if (pname in pnames) and (pnames[pname] is not field): msg = "Name {} was already used by another field." msg = msg.format(pname) raise ValueError(msg) names[name] = field pnames[name] = field @property def fields(self): """Return all unique scalar fields contained in this field-like interface.""" ordered_fields = self._fields.ravel().tolist() fields = set(ordered_fields) fields.discard(None) # keep field ordering unlike using a set unique_fields = () for field in ordered_fields: if (field in fields) and (field not in unique_fields): unique_fields += (field,) return unique_fields @property def nb_components(self): return len(self.fields)
[docs] def short_description(self): """Short description of this tensor field.""" s = "{}[name={}, pname={}, dim={}, shape={}]" s = s.format(self.full_tag, self.name, self.pretty_name, self.dim, self.shape) return s
[docs] def long_description(self): """Long description of this tensor field as a string.""" s = textwrap.dedent( """ {} *name: {} *pretty_name: {} *dim: {} *shape: {} *nb_components: {} *symbolic repr.: """.format( self.full_tag, self.name, self.pretty_name, self.dim, self.shape, self.nb_components, ) )[1:] s += " " + "\n ".join(str(self.symbol).split("\n")) return s
[docs] def field_like( self, name, pretty_name=None, shape=None, nb_components=None, fn="field_like", **kwds, ): """Create a TensorField like this object, possibly altered.""" if (shape is None) and (nb_components is not None): shape = (nb_components,) del nb_components shape = first_not_None(shape, self.shape) nb_components = npw.prod(shape, dtype=npw.int64) pretty_name = first_not_None(pretty_name, name) check_instance(name, str) check_instance(pretty_name, str) if nb_components == 1: return getattr(self.fields[0], fn)( name=name, pretty_name=pretty_name, **kwds ) else: fields = npw.empty(shape=shape, dtype=object) if self.shape == shape: for idx, field in self.nd_iter(): fname = self._name_formatter(basename=name, idx=idx) pfname = self._pretty_name_formatter(basename=pretty_name, idx=idx) fields[idx] = getattr(field, fn)( name=fname, pretty_name=pfname, **kwds ) else: field = self.fields[0] for idx in npw.ndindex(*shape): fname = self._name_formatter(basename=name, idx=idx) pfname = self._pretty_name_formatter(basename=pretty_name, idx=idx) fields[idx] = getattr(field, fn)( name=fname, pretty_name=pfname, **kwds ) return self.from_field_array( name=name, pretty_name=pretty_name, fields=fields )
[docs] def tmp_like(self, name, **kwds): """Create a temporary field like self, possibly altered.""" return self.field_like(name=name, fn="tmp_like", **kwds)
def __getitem__(self, slc): fields = self._fields.__getitem__(slc) if isinstance(fields, ScalarField): return fields else: name = f"{self.name}_{str(slc)}" pretty_name = f"{self.pretty_name}_{str(slc)}" return self.from_field_array( name=name, pretty_name=pretty_name, fields=fields )
[docs] class VectorField(TensorField): """Specialization for vector fields, ie. a TensorField with ndim=1.""" def __new__(cls, domain, name, nb_components=None, shape=None, **kwds): assert (nb_components is None) ^ (shape is None) if shape is None: check_instance(nb_components, int, minval=1) shape = (nb_components,) check_instance(shape, tuple, values=int, size=1) if shape[0] == 1: return ScalarField(domain=domain, name=name, **kwds) obj = super().__new__(cls, domain=domain, name=name, shape=shape, **kwds) return obj def __init__(self, domain, name, nb_components=None, shape=None, **kwds): super().__init__(domain=domain, name=name, shape=shape, **kwds)
[docs] @classmethod def default_name_formatter(cls, basename, idx): assert len(basename) > 0 if basename[-1] in "0123456789": sep = "_" else: sep = "" if len(idx) == 1: name = basename + sep + "_".join(DirectionLabels[i] for i in idx) else: name = basename + sep + "_".join(str(i) for i in idx) return name
Field = FieldContainerI """A Field is just a alias of FieldContainerI"""